package net.sina.realtime.traffic.function;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import net.sina.realtime.traffic.bean.TrafficEvent;
import org.apache.flink.api.common.functions.RichMapFunction;

public class TrafficMapFunction extends RichMapFunction<String, TrafficEvent> {

    @Override
    public TrafficEvent map(String s) throws Exception {
        JSONObject jsonObject = JSON.parseObject(s);

        TrafficEvent trafficEvent = new TrafficEvent();
        trafficEvent.setEventId(jsonObject.getString("event_id"));
        trafficEvent.setCameraId(jsonObject.getString("camera_id"));
        trafficEvent.setLicensePlate(jsonObject.getString("license_plate"));
        trafficEvent.setVehicleType(jsonObject.getString("vehicle_type"));
        trafficEvent.setRoadId(jsonObject.getString("road_id"));
        trafficEvent.setCrossingId(jsonObject.getString("crossing_id"));
        trafficEvent.setTs(jsonObject.getLong("timestamp"));
        trafficEvent.setSpeed(jsonObject.getDouble("speed"));
        trafficEvent.setDirection(jsonObject.getString("direction"));
        trafficEvent.setLane(jsonObject.getInteger("lane"));
        trafficEvent.setEventType(jsonObject.getString("event_type"));
        trafficEvent.setAreaId(jsonObject.getString("area_id"));

        return trafficEvent;
    }
}
